Skip to content

Support TsfileDataFrame#765

Open
ycycse wants to merge 4 commits intoapache:developfrom
ycycse:tsdf
Open

Support TsfileDataFrame#765
ycycse wants to merge 4 commits intoapache:developfrom
ycycse:tsdf

Conversation

@ycycse
Copy link
Copy Markdown
Member

@ycycse ycycse commented Apr 2, 2026

This PR introduce TsFileDataFrame, which can read multi tsfile for model training usage. Just like use dataframe.

https://apache-iotdb-project.feishu.cn/docx/SenJdxlbuoUS5Uxmq7jcOUzdnob?from=from_copylink

Build Wheel

Prerequisite: build the C++ library first, since the Python package depends on the shared library from cpp/target/build.

From the repo root:

mvn -P with-cpp clean install -DskipTests

cd python
python setup.py build_ext --inplace
python setup.py bdist_wheel

pip install dist/*.whl

Validation

  Basic dataset regression tests:

  pytest -q python/tests/test_tsfile_dataset.py

Basic Usage

Open one TsFile

  from tsfile import TsFileDataFrame

  tsdf = TsFileDataFrame("/path/to/file.tsfile")
  print(tsdf)

Open a shard directory

TsFileDataFrame will recursively collect all .tsfile files under the directory.

  from tsfile import TsFileDataFrame

  tsdf = TsFileDataFrame("/path/to/dataset_dir")
  print(len(tsdf))
  print(tsdf.list_timeseries()[:5])

Filter by metadata

  filter_df = tsdf[tsdf['filed'] == 'weather']
  print(filter_df)

Access one logical series

By index:

  series = tsdf[0]
  print(series)
  print(series.name)
  print(series.stats)
  print(series[0])
  print(series[:10])

By logical series path:

  series = tsdf["weather.device_a.temperature"]
  print(series.timestamps)

Build subset views

  subset = tsdf[:10]
  print(subset)

  subset2 = tsdf[[0, 3, 5]]
  print(subset2.list_timeseries())

Time-aligned multi-series query

  aligned = tsdf.loc[0:1000, [0, 1]]
  print(aligned)
  print(aligned.timestamps)
  print(aligned.values)
  print(aligned.series_names)

Or use logical series names:

  aligned = tsdf.loc[
      0:1000,
      ["weather.device_a.temperature", "weather.device_a.humidity"],
  ]

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new Python-side “dataframe-like” API for reading TsFile table-model data across multiple TsFile shards, enabling series discovery, per-series access, and time-aligned multi-series queries for model training workflows.

Changes:

  • Added TsFileSeriesReader to discover series (table + tags + field) and read data via Arrow batch queries.
  • Added TsFileDataFrame (plus Timeseries / AlignedTimeseries) to unify multiple TsFiles, merge overlapping series, and support .loc aligned queries.
  • Exported the new classes from python/tsfile/__init__.py.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
python/tsfile/tsfile_series_reader.py New Arrow-based series reader with metadata discovery, timestamp caching, and range/time reads.
python/tsfile/tsfile_dataframe.py New multi-file dataframe-like abstraction with series selection, merged metadata, and .loc alignment.
python/tsfile/init.py Re-export TsFileDataFrame, Timeseries, and AlignedTimeseries from the package.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +402 to +407
if arrow_table.num_rows > 0:
ts_list.append(arrow_table.column('time').to_numpy())
for fc in field_columns:
field_lists[fc].append(
arrow_table.column(fc).to_numpy().astype(np.float64)
)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _read_arrow, values are always converted via to_numpy().astype(np.float64). This will fail for FIELD columns that are not numeric (e.g., STRING/TEXT/BLOB/DATE) and can also fail when the Arrow array contains nulls (writing code allows NaN values, which become nulls on read). Consider filtering field_columns up-front to numeric TSDataType only, and/or using Arrow casting + null-to-NaN handling before converting to NumPy (and keeping non-numeric columns out of this reader).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation has since been refactored into python/tsfile/dataset/reader.py. The dataset reader now only includes numeric FIELD columns during discovery, and non-numeric fields are excluded from the dataset path. The current read path also keeps an explicit numeric-compatibility check when converting Arrow arrays.

Comment on lines +280 to +285
if series_path in self._series_data_cache:
return self._series_data_cache[series_path][start:end].tolist()

info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_series_range() indexes timestamps[start] and timestamps[end - 1] without validating start/end (including start == end, negative indices, or end > length). This currently raises confusing IndexErrors and makes empty slices impossible. Add explicit range validation and return an empty result for start >= end to match the docstring’s [start, end) semantics.

Suggested change
if series_path in self._series_data_cache:
return self._series_data_cache[series_path][start:end].tolist()
info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]
if start < 0 or end < 0:
raise ValueError("start and end indices must be non-negative")
# Handle cached series data first, using [start, end) semantics
if series_path in self._series_data_cache:
data = self._series_data_cache[series_path]
length = len(data)
if start >= length:
return []
if end > length:
end = length
if start >= end:
return []
return data[start:end].tolist()
info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]
length = len(timestamps)
if start >= length:
return []
if end > length:
end = length
if start >= end:
return []

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +171 to +183
# Multiple tag columns: use structured approach
# Convert to list of tuples for grouping
n = len(all_times)
tag_tuples = [
tuple(all_tags[tc][i] for tc in tag_columns)
for i in range(n)
]
unique_tuples = list(dict.fromkeys(tag_tuples))
for ut in unique_tuples:
mask = np.array([t == ut for t in tag_tuples], dtype=bool)
self._register_tag_group(
table_name, tag_columns, ut,
field_columns, all_times[mask]
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi-tag grouping path builds tag_tuples in Python and then, for each unique tuple, constructs a full boolean mask with a Python loop ([t == ut for t in tag_tuples]). This is O(n * unique_tags) and will become a bottleneck for large tables. Consider using a vectorized approach (e.g., structured NumPy array + np.unique(..., return_inverse=True) or pandas/groupby) to compute groups and indices in (near) linear time.

Suggested change
# Multiple tag columns: use structured approach
# Convert to list of tuples for grouping
n = len(all_times)
tag_tuples = [
tuple(all_tags[tc][i] for tc in tag_columns)
for i in range(n)
]
unique_tuples = list(dict.fromkeys(tag_tuples))
for ut in unique_tuples:
mask = np.array([t == ut for t in tag_tuples], dtype=bool)
self._register_tag_group(
table_name, tag_columns, ut,
field_columns, all_times[mask]
# Multiple tag columns: use structured NumPy array for grouping
n = len(all_times)
# Build a structured array with one field per tag column
dtype = [(tc, all_tags[tc].dtype) for tc in tag_columns]
structured_tags = np.empty(n, dtype=dtype)
for tc in tag_columns:
structured_tags[tc] = all_tags[tc]
# Find unique tag combinations and an inverse index for grouping
unique_vals, inverse = np.unique(
structured_tags, return_inverse=True
)
# Group rows by unique tag combination using the inverse index
for group_id, ut in enumerate(unique_vals):
mask = inverse == group_id
tag_values = tuple(ut[tc] for tc in tag_columns)
self._register_tag_group(
table_name,
tag_columns,
tag_values,
field_columns,
all_times[mask],

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tables with multiple tag columns, grouping is done by:

  1. Building a Python list of tuples (line 174-177): O(n)
  2. For each unique tuple, scanning the entire list to build a boolean mask (line 180): O(n * k) where k = number of unique groups

For a table with 1M rows and 1000 tag groups, this becomes ~1 billion comparisons in pure Python.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation has moved to python/tsfile/dataset/reader.py. The pure-Python tuple/mask grouping path was replaced with a vectorized NumPy grouping implementation based on structured arrays and np.unique(..., return_inverse=True).

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +128 to +139
def show(self, max_rows: Optional[int] = None):
"""Print formatted table with configurable row limit.

Args:
max_rows: Maximum rows to display. None for all rows.
"""
n_rows, n_cols = self.values.shape
if n_rows == 0:
print(f"AlignedTimeseries(0 rows, {n_cols} series)")
return
ts_strs, ts_width, col_widths, val_strs = self._build_display()
print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, max_rows))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass max_rows to _build_display?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. The implementation has moved to python/tsfile/dataset/formatting.py and python/tsfile/dataset/timeseries.py. Now all the formatting use general display logic.

n_rows = len(ts_strs)
n_cols = len(col_widths)

header_parts = ['timestamp'.rjust(ts_width)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column is a datetime string; it is improper to call it timestamp.
May use time or datetime.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation has moved to python/tsfile/dataset/formatting.py. The aligned display header now uses time instead of timestamp.

col_widths = []
val_strs = []
for col_idx in range(n_cols):
col_name = self.series_names[col_idx] if col_idx < len(self.series_names) else f'col_{col_idx}'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case will col_xxx be used?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation has moved to python/tsfile/dataset/formatting.py. This is now only a defensive fallback for mismatched metadata during formatting. In normal public paths it should not be used, since series_names and value columns are expected to match.

Comment on lines +74 to +83
def _build_display(self):
"""Pre-compute string representations for display."""
n_rows, n_cols = self.values.shape
ts_strs = [_format_timestamp(int(t)) for t in self.timestamps]
ts_width = max((len(s) for s in ts_strs), default=0)
ts_width = max(ts_width, len('timestamp'))

col_widths = []
val_strs = []
for col_idx in range(n_cols):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to concat timestamps and values, then use ndarry's print method if any?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation has moved to python/tsfile/dataset/formatting.py. I kept the custom formatter intentionally, because it gives us stable head/tail truncation, NaN rendering, and consistent column formatting, which ndarray printing does not provide well for this API.

Comment on lines +260 to +262
# Deduplicate by timestamp (keep first occurrence)
_, unique_idx = np.unique(merged_ts, return_index=True)
return merged_ts[unique_idx], merged_vals[unique_idx]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why keep the first occurrence?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation has moved to python/tsfile/dataset/merge.py. The current merge policy rejects duplicate timestamps across shards instead of keeping the first occurrence.

Comment on lines +94 to +105
table_schemas = self._reader.get_all_table_schemas()
if not table_schemas:
raise ValueError("No tables found in TsFile")

self.series_paths = []
table_names = list(table_schemas.keys())

# Progress tracking
total_rows = 0

for ti, table_name in enumerate(table_names):
table_schema = self._reader.get_table_schema(table_name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use table_schemas?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines +112 to +113
if col_name.lower() == 'time':
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check column category?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +122 to +130
# Query TAG columns + first FIELD column to discover groups and timestamps
query_cols = tag_columns + [field_columns[0]]

time_arrays = []
tag_arrays = {tc: [] for tc in tag_columns}

with self._reader.query_table_batch(
table_name, query_cols, batch_size=65536
) as rs:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A TsFile may not actually contain a column's data even if it is in the schema.
Double-check if timestamps can be read in this scenario.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing tags repeatedly is a great challenge to memory.
May consider getting all DeviceIds first, and conclude tag values from them.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation has moved to python/tsfile/dataset/metadata.py, python/tsfile/dataset/reader.py, and python/tsfile/dataset/dataframe.py. Shared metadata is now centered on table/device entries, and logical series are represented by lightweight refs instead of repeated per-series tag dictionaries.

Comment on lines +402 to +407
if arrow_table.num_rows > 0:
ts_list.append(arrow_table.column('time').to_numpy())
for fc in field_columns:
field_lists[fc].append(
arrow_table.column(fc).to_numpy().astype(np.float64)
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +419 to +429
def cache_series_data(self, series_path: str):
"""Pre-load series data into memory cache.

Args:
series_path: Time series path.
"""
if series_path not in self.series_info:
raise ValueError(f"Series not found: {series_path}")
if series_path not in self._series_data_cache:
data = self.read_series(series_path)
self._series_data_cache[series_path] = np.array(data, dtype=np.float32)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is not used? And there does not seem to be any memory control.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an old method. We don't need it now.

raise ValueError(f"Series not found: {series_path}")
if series_path not in self._series_data_cache:
data = self.read_series(series_path)
self._series_data_cache[series_path] = np.array(data, dtype=np.float32)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_series returns float64 values (via _read_arrow which casts to np.float64), but cache_series_data stores them as float32. When read_series is later called on a cached series, it returns float32 values silently, which can lose precision.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The old cached-series path was removed during the refactor, so this precision inconsistency no longer exists.

Comment on lines +171 to +183
# Multiple tag columns: use structured approach
# Convert to list of tuples for grouping
n = len(all_times)
tag_tuples = [
tuple(all_tags[tc][i] for tc in tag_columns)
for i in range(n)
]
unique_tuples = list(dict.fromkeys(tag_tuples))
for ut in unique_tuples:
mask = np.array([t == ut for t in tag_tuples], dtype=bool)
self._register_tag_group(
table_name, tag_columns, ut,
field_columns, all_times[mask]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tables with multiple tag columns, grouping is done by:

  1. Building a Python list of tuples (line 174-177): O(n)
  2. For each unique tuple, scanning the entire list to build a boolean mask (line 180): O(n * k) where k = number of unique groups

For a table with 1M rows and 1000 tag groups, this becomes ~1 billion comparisons in pure Python.

Comment on lines +316 to +317
if idx < 0 or idx >= len(self._df._series_list):
raise IndexError(f"Series index {idx} out of range")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negative indices are not normalized (no idx = length + idx like other __getitem__ methods in this file). A user passing df.loc[:, [-1]] would get an IndexError instead of the last series.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The implementation has moved to python/tsfile/dataset/dataframe.py. Negative indices in .loc series selection are normalized now.

Comment on lines +567 to +568
merged = np.unique(np.concatenate(all_ts))
merged.sort()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np.unique already returns a sorted array. The .sort() call is redundant.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +108 to +111
if max_rows is None or n_rows <= max_rows:
show_rows = list(range(n_rows))
else:
show_rows = list(range(max_rows))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When max_rows is exceeded, only the first max_rows rows are shown. In contrast, TsFileDataFrame._format_table shows head + "..." + tail. This inconsistency may confuse users expecting similar behavior from both display methods.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. The aligned display now uses the same head + ... + tail truncation style.

Comment on lines +837 to +846
def close(self):
"""Close all underlying readers.

No-op for subset views (they don't own readers).
"""
if self._is_view:
return
for reader in self._readers.values():
reader.close()
self._readers.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After close(), _readers is cleared but _series_map still holds references to closed readers. Any subsequent data access (e.g., tsdf[0][0]) will attempt to read from a closed reader, producing an unclear error.

Recommendation: Either invalidate _series_map too, or set a _closed flag and check it in data-access paths.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants